a tool for shared writing and social publishing
1import { mutations } from "src/replicache/mutations"; 2import { eq, sql } from "drizzle-orm"; 3import { permission_token_rights, replicache_clients } from "drizzle/schema"; 4import { getClientGroup } from "src/replicache/utils"; 5import { makeRoute } from "../lib"; 6import { z } from "zod"; 7import type { Env } from "./route"; 8import { cachedServerMutationContext } from "src/replicache/cachedServerMutationContext"; 9import { drizzle } from "drizzle-orm/node-postgres"; 10import { pool } from "supabase/pool"; 11 12const mutationV0Schema = z.object({ 13 id: z.number(), 14 name: z.string(), 15 args: z.unknown(), 16 timestamp: z.number(), 17}); 18 19const mutationV1Schema = mutationV0Schema.extend({ 20 clientID: z.string(), 21}); 22 23const pushRequestV0Schema = z.object({ 24 pushVersion: z.literal(0), 25 schemaVersion: z.string(), 26 profileID: z.string(), 27 clientID: z.string(), 28 mutations: z.array(mutationV0Schema), 29}); 30 31const pushRequestV1Schema = z.object({ 32 pushVersion: z.literal(1), 33 schemaVersion: z.string(), 34 profileID: z.string(), 35 clientGroupID: z.string(), 36 mutations: z.array(mutationV1Schema), 37}); 38 39// Combine both versions into final PushRequest schema 40const pushRequestSchema = z.discriminatedUnion("pushVersion", [ 41 pushRequestV0Schema, 42 pushRequestV1Schema, 43]); 44 45type PushRequestZ = z.infer<typeof pushRequestSchema>; 46 47export const push = makeRoute({ 48 route: "push", 49 input: z.object({ 50 pushRequest: pushRequestSchema, 51 rootEntity: z.string(), 52 token: z.object({ id: z.string() }), 53 }), 54 handler: async ({ pushRequest, rootEntity, token }, { supabase }: Env) => { 55 if (pushRequest.pushVersion !== 1) { 56 return { 57 result: { error: "VersionNotSupported", versionType: "push" } as const, 58 }; 59 } 60 let timeWaitingForLock: number; 61 let timeWaitingForDbConnection: number; 62 let timeProcessingMutations: number = 0; 63 let timeGettingClientGroup: number = 0; 64 let timeGettingTokenRights: number = 0; 65 let timeFlushingContext: number = 0; 66 let timeUpdatingLastMutations: number = 0; 67 let mutationTimings: Array<{ 68 name: string; 69 duration: number; 70 }> = []; 71 72 let start = performance.now(); 73 let client = await pool.connect(); 74 timeWaitingForDbConnection = performance.now() - start; 75 start = performance.now(); 76 const db = drizzle(client); 77 let channel = supabase.channel(`rootEntity:${rootEntity}`); 78 timeWaitingForLock = performance.now() - start; 79 start = performance.now(); 80 try { 81 await db.transaction(async (tx) => { 82 const tokenHash = token.id.split("").reduce((acc, char) => { 83 return ((acc << 5) - acc + char.charCodeAt(0)) | 0; 84 }, 0); 85 86 await tx.execute(sql`SELECT pg_advisory_xact_lock(${tokenHash})`); 87 88 let clientGroupStart = performance.now(); 89 let clientGroup = await getClientGroup(tx, pushRequest.clientGroupID); 90 timeGettingClientGroup = performance.now() - clientGroupStart; 91 92 let tokenRightsStart = performance.now(); 93 let token_rights = await tx 94 .select() 95 .from(permission_token_rights) 96 .where(eq(permission_token_rights.token, token.id)); 97 timeGettingTokenRights = performance.now() - tokenRightsStart; 98 let { getContext, flush } = cachedServerMutationContext( 99 tx, 100 token.id, 101 token_rights, 102 ); 103 104 let lastMutations = new Map<string, number>(); 105 console.log(`Processing mutations on ${token.id}`); 106 console.log( 107 `Processing ${pushRequest.mutations.length} mutations:`, 108 pushRequest.mutations.map((m) => m.name), 109 ); 110 111 for (let mutation of pushRequest.mutations) { 112 let lastMutationID = clientGroup[mutation.clientID] || 0; 113 if (mutation.id <= lastMutationID) continue; 114 115 clientGroup[mutation.clientID] = mutation.id; 116 let name = mutation.name as keyof typeof mutations; 117 if (!mutations[name]) { 118 continue; 119 } 120 121 let mutationStart = performance.now(); 122 try { 123 let ctx = getContext(mutation.clientID, mutation.id); 124 await mutations[name](mutation.args as any, ctx); 125 let mutationDuration = performance.now() - mutationStart; 126 mutationTimings.push({ 127 name: mutation.name, 128 duration: mutationDuration, 129 }); 130 } catch (e) { 131 let mutationDuration = performance.now() - mutationStart; 132 mutationTimings.push({ 133 name: mutation.name, 134 duration: mutationDuration, 135 }); 136 console.log( 137 `Error occurred while running mutation: ${name} after ${mutationDuration.toFixed(2)}ms`, 138 JSON.stringify(e), 139 JSON.stringify(mutation, null, 2), 140 ); 141 } 142 lastMutations.set(mutation.clientID, mutation.id); 143 } 144 145 let dbUpdateStart = performance.now(); 146 let lastMutationIdsUpdate = Array.from(lastMutations.entries()).map( 147 (entries) => ({ 148 client_group: pushRequest.clientGroupID, 149 client_id: entries[0], 150 last_mutation: entries[1], 151 }), 152 ); 153 console.log("lastMutationIdsUpdate:", lastMutationIdsUpdate); 154 if (lastMutationIdsUpdate.length > 0) 155 await tx 156 .insert(replicache_clients) 157 .values(lastMutationIdsUpdate) 158 .onConflictDoUpdate({ 159 target: replicache_clients.client_id, 160 set: { last_mutation: sql`excluded.last_mutation` }, 161 }); 162 timeUpdatingLastMutations = performance.now() - dbUpdateStart; 163 164 let flushStart = performance.now(); 165 await flush(); 166 timeFlushingContext = performance.now() - flushStart; 167 }); 168 timeProcessingMutations = performance.now() - start; 169 170 await channel.send({ 171 type: "broadcast", 172 event: "poke", 173 payload: { message: "poke" }, 174 }); 175 } catch (e) { 176 timeProcessingMutations = performance.now() - start; 177 console.log(e); 178 } finally { 179 // Calculate mutation statistics 180 let totalMutationTime = mutationTimings.reduce( 181 (sum, m) => sum + m.duration, 182 0, 183 ); 184 185 console.log(` 186Push Request Performance Summary (${timeProcessingMutations.toFixed(2)}ms): 187================================ 188Total Elapsed Time: ${timeProcessingMutations.toFixed(2)}ms 189Time Waiting for DB Connection: ${timeWaitingForDbConnection.toFixed(2)}ms 190Time Waiting For Lock: ${timeWaitingForLock.toFixed(2)}ms 191Time Getting Client Group: ${timeGettingClientGroup.toFixed(2)}ms 192Time Getting Token Rights: ${timeGettingTokenRights.toFixed(2)}ms 193Time Updating Last Mutations: ${timeUpdatingLastMutations.toFixed(2)}ms 194Time Flushing Context: ${timeFlushingContext.toFixed(2)}ms 195 196Mutation Statistics: 197=================== 198Total Mutations Processed: ${mutationTimings.length} 199Total Mutation Execution Time: ${totalMutationTime.toFixed(2)}ms 200Average Mutation Time: ${mutationTimings.length > 0 ? (totalMutationTime / mutationTimings.length).toFixed(2) : "0.00"}ms 201 202Slowest Mutations: 203${mutationTimings 204 .sort((a, b) => b.duration - a.duration) 205 .slice(0, 5) 206 .map((m) => ` ${m.name}: ${m.duration.toFixed(2)}ms`) 207 .join("\n")} 208 `); 209 210 client.release(); 211 await supabase.removeChannel(channel); 212 return { result: undefined } as const; 213 } 214 }, 215});